/**
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
 * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
 * and limitations under the License.
 */

package org.apache.storm.scheduler.multitenant;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.storm.Config;
import org.apache.storm.scheduler.SchedulerAssignment;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A pool of machines that can be used to run isolated topologies.
 */
public class IsolatedPool extends NodePool {
    private static final Logger LOG = LoggerFactory.getLogger(IsolatedPool.class);
    private Map<String, Set<Node>> topologyIdToNodes = new HashMap<>();
    private HashMap<String, TopologyDetails> tds = new HashMap<>();
    private HashSet<String> isolated = new HashSet<>();
    private int maxNodes;
    private int usedNodes;

    public IsolatedPool(int maxNodes) {
        this.maxNodes = maxNodes;
        usedNodes = 0;
    }

    @Override
    public void addTopology(TopologyDetails td) {
        String topId = td.getId();
        LOG.debug("Adding in Topology {}", topId);
        SchedulerAssignment assignment = cluster.getAssignmentById(topId);
        Set<Node> assignedNodes = new HashSet<>();
        if (assignment != null) {
            for (WorkerSlot ws : assignment.getSlots()) {
                Node n = nodeIdToNode.get(ws.getNodeId());
                assignedNodes.add(n);
            }
        }
        usedNodes += assignedNodes.size();
        topologyIdToNodes.put(topId, assignedNodes);
        tds.put(topId, td);
        if (td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES) != null) {
            isolated.add(topId);
        }
    }

    @Override
    public boolean canAdd(TopologyDetails td) {
        //Only add topologies that are not sharing nodes with other topologies
        String topId = td.getId();
        SchedulerAssignment assignment = cluster.getAssignmentById(topId);
        if (assignment != null) {
            for (WorkerSlot ws : assignment.getSlots()) {
                Node n = nodeIdToNode.get(ws.getNodeId());
                if (n.getRunningTopologies().size() > 1) {
                    return false;
                }
            }
        }
        return true;
    }

    @Override
    public void scheduleAsNeeded(NodePool... lesserPools) {
        for (String topId : topologyIdToNodes.keySet()) {
            TopologyDetails td = tds.get(topId);
            Set<Node> allNodes = topologyIdToNodes.get(topId);
            Number nodesRequested = (Number) td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES);
            Integer effectiveNodesRequested = null;
            if (nodesRequested != null) {
                effectiveNodesRequested = Math.min(td.getExecutors().size(),
                                                   +nodesRequested.intValue());
            }
            if (cluster.needsScheduling(td)
                    || (effectiveNodesRequested != null && allNodes.size() != effectiveNodesRequested)) {
                LOG.debug("Scheduling topology {}", topId);
                int slotsToUse = 0;
                if (effectiveNodesRequested == null) {
                    slotsToUse = getNodesForNotIsolatedTop(td, allNodes, lesserPools);
                } else {
                    slotsToUse = getNodesForIsolatedTop(td, allNodes, lesserPools,
                                                        effectiveNodesRequested);
                }
                //No slots to schedule for some reason, so skip it.
                if (slotsToUse <= 0) {
                    continue;
                }

                RoundRobinSlotScheduler slotSched =
                    new RoundRobinSlotScheduler(td, slotsToUse, cluster);

                LOG.debug("Nodes sorted by free space {}", allNodes);
                while (true) {
                    Node n = findBestNode(allNodes);
                    if (n == null) {
                        LOG.error("No nodes to use to assign topology {}", td.getName());
                        break;
                    }
                    if (!slotSched.assignSlotTo(n)) {
                        break;
                    }
                }
            }
            Set<Node> found = topologyIdToNodes.get(topId);
            int nc = found == null ? 0 : found.size();
            cluster.setStatus(topId, "Scheduled Isolated on " + nc + " Nodes");
        }
    }

    private Node findBestNode(Collection<Node> nodes) {
        Node ret = null;
        for (Node node : nodes) {
            if (ret == null) {
                if (node.totalSlotsFree() > 0) {
                    ret = node;
                }
            } else {
                if (node.totalSlotsFree() > 0) {
                    if (node.totalSlotsUsed() < ret.totalSlotsUsed()) {
                        ret = node;
                    } else if (node.totalSlotsUsed() == ret.totalSlotsUsed()) {
                        if (node.totalSlotsFree() > ret.totalSlotsFree()) {
                            ret = node;
                        }
                    }
                }
            }
        }
        return ret;
    }

    /**
     * Get the nodes needed to schedule an isolated topology.
     * @param td the topology to be scheduled
     * @param allNodes the nodes already scheduled for this topology.
     *     This will be updated to include new nodes if needed.
     * @param lesserPools node pools we can steal nodes from
     * @return the number of additional slots that should be used for scheduling.
     */
    private int getNodesForIsolatedTop(TopologyDetails td, Set<Node> allNodes,
                                       NodePool[] lesserPools, int nodesRequested) {
        String topId = td.getId();
        LOG.debug("Topology {} is isolated", topId);
        int nodesFromUsAvailable = nodesAvailable();
        int nodesFromOthersAvailable = NodePool.nodesAvailable(lesserPools);

        int nodesUsed = topologyIdToNodes.get(topId).size();
        int nodesNeeded = nodesRequested - nodesUsed;
        LOG.debug("Nodes... requested {} used {} available from us {} "
                        + "avail from other {} needed {}",
                nodesRequested,
                nodesUsed,
                nodesFromUsAvailable,
                nodesFromOthersAvailable,
                nodesNeeded);
        if ((nodesNeeded - nodesFromUsAvailable) > (maxNodes - usedNodes)) {
            cluster.setStatus(topId,
                    "Max Nodes("
                            + maxNodes
                            + ") for this user would be exceeded. "
                            + ((nodesNeeded - nodesFromUsAvailable) - (maxNodes - usedNodes))
                            + " more nodes needed to run topology.");
            return 0;
        }

        //In order to avoid going over maxNodes I may need to steal from
        // myself even though other pools have free nodes. so figure out how
        // much each group should provide
        int nodesNeededFromOthers = Math.min(Math.min(maxNodes - usedNodes,
                                                      nodesFromOthersAvailable), nodesNeeded);
        int nodesNeededFromUs = nodesNeeded - nodesNeededFromOthers;
        LOG.debug("Nodes... needed from us {} needed from others {}",
                  nodesNeededFromUs, nodesNeededFromOthers);

        if (nodesNeededFromUs > nodesFromUsAvailable) {
            cluster.setStatus(topId, "Not Enough Nodes Available to Schedule Topology");
            return 0;
        }

        //Get the nodes
        Collection<Node> found = NodePool.takeNodes(nodesNeededFromOthers, lesserPools);
        usedNodes += found.size();
        allNodes.addAll(found);
        Collection<Node> foundMore = takeNodes(nodesNeededFromUs);
        usedNodes += foundMore.size();
        allNodes.addAll(foundMore);

        int totalTasks = td.getExecutors().size();
        int origRequest = td.getNumWorkers();
        int slotsRequested = Math.min(totalTasks, origRequest);
        int slotsUsed = Node.countSlotsUsed(allNodes);
        int slotsFree = Node.countFreeSlotsAlive(allNodes);
        int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree);
        if (slotsToUse <= 0) {
            // if # of workers requested is more than we currently have
            if (origRequest > slotsUsed) {
                cluster.setStatus(topId,
                        "Running with fewer slots than requested "
                                + slotsUsed
                                + "/"
                                + origRequest
                                + " on "
                                + allNodes.size()
                                + " node(s) with "
                                + (slotsUsed + slotsFree)
                                + " total slots");
            } else {
                // if # of workers requested is less than we took
                // then we know some workers we track died, since we have more workers than we are supposed to have
                cluster.setStatus(topId, "Node has partially crashed, if this situation persists rebalance the topology.");
            }
        }
        return slotsToUse;
    }

    /**
     * Get the nodes needed to schedule a non-isolated topology.
     * @param td the topology to be scheduled
     * @param allNodes the nodes already scheduled for this topology.
     *     This will be updated to include new nodes if needed.
     * @param lesserPools node pools we can steal nodes from
     * @return the number of additional slots that should be used for scheduling.
     */
    private int getNodesForNotIsolatedTop(TopologyDetails td, Set<Node> allNodes,
                                          NodePool[] lesserPools) {
        String topId = td.getId();
        LOG.debug("Topology {} is not isolated", topId);
        int totalTasks = td.getExecutors().size();
        int origRequest = td.getNumWorkers();
        int slotsRequested = Math.min(totalTasks, origRequest);
        int slotsUsed = Node.countSlotsUsed(topId, allNodes);
        int slotsFree = Node.countFreeSlotsAlive(allNodes);
        //Check to see if we have enough slots before trying to get them
        int slotsAvailable = 0;
        if (slotsRequested > slotsFree) {
            slotsAvailable = NodePool.slotsAvailable(lesserPools);
        }
        int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + slotsAvailable);
        LOG.debug("Slots... requested {} used {} free {} available {} to be used {}",
                  slotsRequested, slotsUsed, slotsFree, slotsAvailable, slotsToUse);
        if (slotsToUse <= 0) {
            cluster.setStatus(topId, "Not Enough Slots Available to Schedule Topology");
            return 0;
        }
        int slotsNeeded = slotsToUse - slotsFree;
        int numNewNodes = NodePool.getNodeCountIfSlotsWereTaken(slotsNeeded, lesserPools);
        LOG.debug("Nodes... new {} used {} max {}",
                  numNewNodes, usedNodes, maxNodes);
        if ((numNewNodes + usedNodes) > maxNodes) {
            cluster.setStatus(topId,
                    "Max Nodes("
                            + maxNodes
                            + ") for this user would be exceeded. "
                            + (numNewNodes - (maxNodes - usedNodes))
                            + " more nodes needed to run topology.");
            return 0;
        }

        Collection<Node> found = NodePool.takeNodesBySlot(slotsNeeded, lesserPools);
        usedNodes += found.size();
        allNodes.addAll(found);
        return slotsToUse;
    }

    @Override
    public Collection<Node> takeNodes(int nodesNeeded) {
        LOG.debug("Taking {} from {}", nodesNeeded, this);
        HashSet<Node> ret = new HashSet<>();
        for (Entry<String, Set<Node>> entry : topologyIdToNodes.entrySet()) {
            if (!isolated.contains(entry.getKey())) {
                Iterator<Node> it = entry.getValue().iterator();
                while (it.hasNext()) {
                    if (nodesNeeded <= 0) {
                        return ret;
                    }
                    Node n = it.next();
                    it.remove();
                    n.freeAllSlots(cluster);
                    ret.add(n);
                    nodesNeeded--;
                    usedNodes--;
                }
            }
        }
        return ret;
    }

    @Override
    public int nodesAvailable() {
        int total = 0;
        for (Entry<String, Set<Node>> entry : topologyIdToNodes.entrySet()) {
            if (!isolated.contains(entry.getKey())) {
                total += entry.getValue().size();
            }
        }
        return total;
    }

    @Override
    public int slotsAvailable() {
        int total = 0;
        for (Entry<String, Set<Node>> entry : topologyIdToNodes.entrySet()) {
            if (!isolated.contains(entry.getKey())) {
                total += Node.countTotalSlotsAlive(entry.getValue());
            }
        }
        return total;
    }

    @Override
    public Collection<Node> takeNodesBySlots(int slotsNeeded) {
        HashSet<Node> ret = new HashSet<>();
        for (Entry<String, Set<Node>> entry : topologyIdToNodes.entrySet()) {
            if (!isolated.contains(entry.getKey())) {
                Iterator<Node> it = entry.getValue().iterator();
                while (it.hasNext()) {
                    Node n = it.next();
                    if (n.isAlive()) {
                        it.remove();
                        usedNodes--;
                        n.freeAllSlots(cluster);
                        ret.add(n);
                        slotsNeeded -= n.totalSlots();
                        if (slotsNeeded <= 0) {
                            return ret;
                        }
                    }
                }
            }
        }
        return ret;
    }

    @Override
    public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slotsNeeded) {
        int nodesFound = 0;
        int slotsFound = 0;
        for (Entry<String, Set<Node>> entry : topologyIdToNodes.entrySet()) {
            if (!isolated.contains(entry.getKey())) {
                for (Node n : entry.getValue()) {
                    if (n.isAlive()) {
                        nodesFound++;
                        int totalSlotsFree = n.totalSlots();
                        slotsFound += totalSlotsFree;
                        slotsNeeded -= totalSlotsFree;
                        if (slotsNeeded <= 0) {
                            return new NodeAndSlotCounts(nodesFound, slotsFound);
                        }
                    }
                }
            }
        }
        return new NodeAndSlotCounts(nodesFound, slotsFound);
    }

    @Override
    public String toString() {
        return "IsolatedPool... ";
    }
}
